package sink;

import beans.SenSorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
 * 从卡夫卡中读取数据，载数据进行处理后在发送到卡夫卡
 */
public class SinkTest01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "a:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> inputStream = env.addSource(new FlinkKafkaConsumer011<String>("first", new SimpleStringSchema(), properties));


        /**
         * 这边的·1toString是简写了下方与Kafka连接的序列化
         */
        SingleOutputStreamOperator<String> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SenSorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString();
        });

        DataStreamSink<String> first = dataStream.addSink(new FlinkKafkaProducer011<String>("a:9092", "first", new SimpleStringSchema()));



        env.execute();
    }
}
